This code file is to convert Parquet file dataset insert to Hbase


In [1]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
from collections import defaultdict
import pickle
from __future__ import division

In [2]:
#load parquet file
old_df = sqlContext.parquetFile('./master')
month = udf(lambda date_time: date_time.month, IntegerType())
df = old_df.withColumn("month", month(old_df.date))

In [8]:
crime_district_df = df.select(df.year, df.month,df.district,df.primarytype)
#Registers this RDD as a temporary table using the given name.
crime_district_df.registerTempTable("crime_district_df_table")
column_district = sqlContext.sql("select year, month, district, count(*) as count  \
        from crime_district_df_table group by year, month, district" )
column_primaryType = sqlContext.sql("select year, month, primarytype, count(*) as count  \
        from crime_district_df_table group by year, month, primarytype" )

In [9]:
#Returns all the records as a list of Row.
column1_dataframe = column_district.collect()
column2_dataframe = column_primaryType.collect()

In [10]:
#create two dictionary as the place holder of the data, this step is used to create proper data structure for starbase library
big_dict1 = {}
def myfunc(key1,key2,value):
    if not key1 in big_dict1:
        big_dict1[key1]=dict()
        big_dict1[key1][key2]=value
    elif not key2 in big_dict1[key1]:
        big_dict1[key1][key2] = value
big_dict2 = {}
def myfunc2(key1,key2,value):
    if not key1 in big_dict2:
        big_dict2[key1]=dict()
        big_dict2[key1][key2]=value
    elif not key2 in big_dict2[key1]:
        big_dict2[key1][key2] = value

In [12]:
for entry in column1_dataframe:
    myfunc(str(entry.year)+'_'+str(entry.month), entry.district, entry.count)
for entry in column2_dataframe:
    myfunc2(str(entry.year)+'_'+str(entry.month), entry.primarytype, entry.count)
#count total amount crimes in each month    
big_dict1_total = dict()
for year_month,value in big_dict1.items():
    subtotal = 0
    for disctrict in value.keys():
        subtotal = subtotal + value[disctrict]
    big_dict1_total[year_month] = subtotal
#count all types of crimes in each month
big_dict2_total = dict()
for year_month,value in big_dict2.items():
    subtotal = 0
    for primarytype in value.keys():
        subtotal = subtotal + value[primarytype]
    big_dict2_total[year_month] = subtotal

In [14]:
#calculate district crime rate
for year_month, district_data in big_dict1.items():
    for dis in district_data.keys():
        big_dict1[year_month][dis] = big_dict1[year_month][dis]/big_dict1_total[year_month]
#calculate primary type crime rate
for year_month, primarytype_data in big_dict2.items():
    for primarytype in primarytype_data.keys():
        big_dict2[year_month][primarytype] = big_dict2[year_month][primarytype]/big_dict2_total[year_month]

In [ ]:
t=c.table('ChicagoCrimeRecords') #creat a Hbase table
t.create('District','Primarytype')#creat two column family
b=t.batch() #batch operation work similar to normal insert and update, but done in batch
if b:
    for i in df_district.keys():
        pre_data =  df_district[i]
        data = {'District': pre_data}
        b.insert(i, data)
        data ={}
    b.commit(finalize=True)
    
b=t.batch()
if b:
    for i in df_type.keys():
        pre_data =  df_type[i]
        data = {'Primarytype': pre_data}
        b.insert(i, data)
        data ={}
    b.commit(finalize=True)